/*
 * Copyright 2023 Hazelcast Inc.
 *
 * Licensed under the Hazelcast Community License (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://hazelcast.com/hazelcast-community-license
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.hazelcast.jet.obs;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.obs.services.ObsClient;

import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
 * @author: wengruichen
 * @since: 2023/8/21:16:58
 * @description: test
 */
public class ObsTest {
    /**
     * 字符集
     */
    private static final ILogger LOGGER = Logger.getLogger(ObsTest.class);
    private static final Charset CHARSET = UTF_8;
    /**
     * OBS AK
     */
    private static final String ACCESS_KEY = "your access key";
    /**
     * OBS SK
     */
    private static final String SECURITY_KEY = "your security key";
    /**
     * OBS URL
     */
    private static final String ENDPOINT = "obs.{region}.myhuaweicloud.com";
    /**
     * OBS bucket name
     */
    private static final String BUCKET_NAME = "bucket name";
    /**
     * OBS prefix
     */
    private static final String PREFIX = "prefix";

    public static void main(String[] str) {
        HazelcastInstance hz = Hazelcast.bootstrappedInstance();
        putData2ObsByMap(hz);
        getData2MapFromObs(hz);
    }

    /**
     * get data from hazelcast map and put them into obs
     *
     * @param hz HazelcastInstance
     */
    public static void putData2ObsByMap(final HazelcastInstance hz) {
        IMap<String, String> hzMap = hz.getMap("map");
        hzMap.put("key1", "value1");
        hzMap.put("key2", "value2");
        hzMap.put("key3", "value3");

        Pipeline p = Pipeline.create();
        p.readFrom(Sources.map(hzMap))
                .writeTo(ObsSinks.obs(BUCKET_NAME, PREFIX, CHARSET, () ->
                        new ObsClient(ACCESS_KEY, SECURITY_KEY, ENDPOINT), Object::toString));

        hz.getJet().newJob(p).join();
    }

    /**
     * get data from obs and put them into hazelcast map
     *
     * @param hz HazelcastInstance
     */
    public static void getData2MapFromObs(final HazelcastInstance hz) {
        IMap<String, String> hzMap = hz.getMap("map");

        List<String> nameList = new ArrayList<>();
        nameList.add(BUCKET_NAME);

        Pipeline p1 = Pipeline.create();
        p1.readFrom(ObsSources.obs(nameList, PREFIX, () -> new ObsClient(ACCESS_KEY, SECURITY_KEY, ENDPOINT)))
                .map(mapString -> {
                    String[] split = mapString.split("=");
                    String key = split[0];
                    String value = split[1];
                    LOGGER.info("Key: " + key + ", Value: " + value);
                    return new AbstractMap.SimpleEntry<>(key, value);
                }).writeTo(Sinks.map(hzMap));

        hz.getJet().newJob(p1).join();
    }

}

